#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "chunk.h"
#include "bmap.h"
#include "metadata.h"
#include "net_table.h"
#include "configure.h"
#include "net_global.h"
#include "md_parent.h"
#include "../controller/stor_ctl.h"
#include "../replica/replica.h"
#include "locator_rpc.h"
#include "main_loop.h"
#include "../../ynet/sock/sock_udp.h"
#include "ylog.h"
#include "dbg.h"

typedef struct {
        int sd;
} locator_srv_t;

typedef struct {
        uint32_t magic;
        uint32_t crc;
        chkid_t chkid;
        fileid_t parent;
        uint32_t flag;
        char name[256];
        uuid_t uuid;
} locator_req_t;

typedef struct {
        uint32_t magic;
        uint32_t crc;
        nid_t nid;
        uuid_t uuid;
        int ret;
} locator_rep_t;

#define LOCATOR_RPC_MAGIC 0x866aa9f1
#define LOCATOR_RPC_NOCHECK 0x00001

static locator_srv_t *locator_srv;

typedef struct {
        int sd;
        uuid_t uuid;
        chkid_t chkid;
        fileid_t parent;
        struct sockaddr addr;
        socklen_t addrlen;
        char name[256];
} arg_t;

static void __locator_srv__(void *_arg)
{
        int ret;
        uint32_t _addr;
        locator_rep_t rep;
        arg_t *arg;
        poolid_t *parent;

        arg = _arg;

        if (chkid_isnull(&arg->parent)) {
                parent = NULL;
        } else {
                parent = &arg->parent;
        }

        ret = stor_ctl_lookup_srv(&arg->chkid, parent);
        if (unlikely(ret)) {
                _addr = ((struct sockaddr_in *)&arg->addr)->sin_addr.s_addr;
                if (ret != EREMCHG) {
                        DWARN("lookup "CHKID_FORMAT" from cluster %s/%s ret (%u) %s\n",
                              CHKID_ARG(&arg->chkid), arg->name, _inet_ntoa(_addr), ret, strerror(ret));
                }

                //ret = (ret == EREMCHG) ? ENOENT : ret;
        } else {
                _addr = ((struct sockaddr_in *)&arg->addr)->sin_addr.s_addr;
                DBUG("lookup "CHKID_FORMAT" from cluster %s/%s successful\n",
                      CHKID_ARG(&arg->chkid), arg->name, _inet_ntoa(_addr));
        }

        rep.ret = ret;
        rep.magic = LOCATOR_RPC_MAGIC;
        rep.nid = ng.local_nid;
        rep.crc = crc32_sum(&ng.local_nid, sizeof(ng.local_nid));
        uuid_copy(rep.uuid, arg->uuid);

        ret = sendto(arg->sd, &rep, sizeof(rep), 0, &arg->addr, arg->addrlen);
        if (ret < 0) {
                ret = errno;
                DWARN("ret (%u) %s\n", ret, strerror(ret));
        }

        yfree((void **)&_arg);
}

static int __locator_srv(int sd, const locator_req_t *req, const struct sockaddr *addr, socklen_t addrlen)
{
        int ret;
        uint32_t _addr;
        locator_rep_t rep;
        chkid_t parent;
        arg_t *arg;

#if 0
        ret = network_connect_master();
        if (ret) {
                GOTO(err_ret, ret);
        }
#endif
                
        if (net_isnull(net_getadmin())) {
                DWARN("not inited\n");
                goto out;
        }

        _addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;

        if (req->magic != LOCATOR_RPC_MAGIC) {
                DBUG("lookup "CHKID_FORMAT" from cluster %s/%s, magic %x\n",
                      CHKID_ARG(&req->chkid), req->name, _inet_ntoa(_addr), req->magic);
                goto out;
        }

        if (strcmp(req->name, gloconf.uuid)) {
                DBUG("lookup "CHKID_FORMAT" from cluster %s/%s\n",
                      CHKID_ARG(&req->chkid), req->name, _inet_ntoa(_addr));
                goto out;
        }                

        YASSERT(req->crc == crc32_sum(&req->chkid, sizeof(req->chkid)));

        ret = replica_srv_getparent(&req->chkid, &parent, NULL);
        if (unlikely(ret)) {
                DBUG("lookup "CHKID_FORMAT" from %s ret (%u) %s\n",
                      CHKID_ARG(&req->chkid), _inet_ntoa(_addr), ret, strerror(ret));
                rep.ret = ret;
                rep.magic = LOCATOR_RPC_MAGIC;
                rep.nid = ng.local_nid;
                rep.crc = crc32_sum(&ng.local_nid, sizeof(ng.local_nid));
                uuid_copy(rep.uuid, req->uuid);

                ret = sendto(sd, &rep, sizeof(rep), 0, addr, addrlen);
                if (ret < 0) {
                        ret = errno;
                        DWARN("ret (%u) %s\n", ret, strerror(ret));
                        GOTO(err_ret, ret);
                }
        } else {
                DBUG("lookup "CHKID_FORMAT" %x\n", CHKID_ARG(&req->chkid), req->flag);
                
                if (req->flag & LOCATOR_RPC_NOCHECK) {
                        rep.ret = 0;
                        rep.magic = LOCATOR_RPC_MAGIC;
                        rep.nid = ng.local_nid;
                        rep.crc = crc32_sum(&ng.local_nid, sizeof(ng.local_nid));
                        uuid_copy(rep.uuid, req->uuid);

                        ret = sendto(sd, &rep, sizeof(rep), 0, addr, addrlen);
                        if (ret < 0) {
                                ret = errno;
                                DWARN("ret (%u) %s\n", ret, strerror(ret));
                        }
                } else {
                        ret = ymalloc((void **)&arg, sizeof(*arg));
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        arg->sd = sd;
                        memcpy(arg->uuid,  req->uuid, sizeof(req->uuid));
                        arg->chkid = req->chkid;
                        arg->parent = req->parent;
                        arg->addr = *addr;
                        arg->addrlen = addrlen;
                        memcpy(arg->name, req->name, sizeof(req->name));

                        ret = main_loop_request(__locator_srv__, arg, "locator_srv");
                        if (unlikely(ret))
                                GOTO(err_free, ret);

                        DINFO("lookup "CHKID_FORMAT" from %s\n",
                              CHKID_ARG(&req->chkid), _inet_ntoa(_addr));
                }
        }

out:
        return 0;
err_free:
        yfree((void **)&arg);
err_ret:
        return ret;
}

static void *__locator_srv_worker(void *args)
{
        int ret;
        locator_req_t req;
        struct sockaddr addr;
        socklen_t addrlen;

        (void) args;

        while (1) {
                ret = sock_poll_sd(locator_srv->sd, 1000 * 1000, POLLIN);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT || ret == ETIME)
                                continue;
                        else
                                GOTO(err_ret, ret);
                }

                addrlen = sizeof(struct sockaddr);
                ret = recvfrom(locator_srv->sd, &req, sizeof(req), 0, &addr, &addrlen);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
                
                ret = __locator_srv(locator_srv->sd, &req, &addr, addrlen);
                if (unlikely(ret)) {
                        DWARN("ret (%u) %s\n", ret, strerror(ret));
                }
        }

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

typedef vec_t(nid_t) vec_nid_t;

static int __nid_found(vec_nid_t *v, const nid_t *nid)
{
        int i, exist = 0;
        nid_t pos;

        vec_foreach(v, pos, i) {
                if (!nid_cmp(&pos, nid)) {
                        exist = 1;
                        break;
                }
        }

        if (!exist) {
                vec_push(v, *nid);
        }

        return exist;
}

static int __locator_rpc_lookup_wait(int sd, const uuid_t uuid, const chkid_t *chkid,
                                     nid_t *nid, uint64_t tmo)
{
        int ret, i, count, noent = 0;
        locator_rep_t *rep;
        char buf[MAX_BUF_LEN];
        uint64_t left, used;
        struct timeval t1, t2;
        vec_nid_t v;

        vec_init(&v);
        ret = cluster_countnode(&count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        left = tmo;
        i = 0;
        while (i < count) {
                _gettimeofday(&t1, NULL);
                
                ret = sock_poll_sd(sd, left, POLLIN);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                _gettimeofday(&t2, NULL);
                used = _time_used(&t1, &t2);
                left = left >= used ? left - used : 0;
                
                ret = recv(sd, buf, sizeof(*rep), 0);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                rep = (void *)buf;
                YASSERT(ret == sizeof(*rep));
                YASSERT(rep->crc == crc32_sum(&rep->nid, sizeof(rep->nid)));
                YASSERT(rep->magic == LOCATOR_RPC_MAGIC);

                if (uuid_compare(uuid, rep->uuid)) {
                        continue;
                }

                if (__nid_found(&v, &rep->nid)) {
                        DINFO("lookup "CHKID_FORMAT" at %s repeat!!!\n",
                              CHKID_ARG(chkid), network_rname(&rep->nid));
                        continue;
                }

                if (rep->ret == 0) {
                        if (nid) {
                                *nid = rep->nid;
                                goto out;
                        }
                } else {
                        if (rep->ret == ENOENT) {
                                noent++;
                        } else {
                                DINFO("lookup "CHKID_FORMAT" at %s ret (%d) %s\n",
                                      CHKID_ARG(chkid), network_rname(&rep->nid),
                                      rep->ret, strerror(rep->ret));
                        }
                }

                if (!nid) {
                        printf("lookup "CHKID_FORMAT" at %s ret (%d) %s\n",
                                        CHKID_ARG(chkid), network_rname(&rep->nid),
                               rep->ret, strerror(rep->ret));
                }

                i++;
        }

        if (!nid)
                goto out;

        if (noent == count) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        } else {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

out:
        vec_deinit(&v);
        return 0;
err_ret:
        vec_deinit(&v);
        return ret;
}

static int __locator_rpc_lookup(int sd, const chkid_t *chkid,
                                const fileid_t *parent, nid_t *nid, uint64_t tmo, int force)
{
        int ret;
        locator_req_t req;

        (void) tmo;
        
        req.magic = LOCATOR_RPC_MAGIC;
        req.chkid = *chkid;
        req.crc = crc32_sum(chkid, sizeof(*chkid));
        req.flag = 0;
        if (parent)
                req.parent = *parent;
        else
                memset(&req.parent, 0x0, sizeof(fileid_t));
        strcpy(req.name, gloconf.uuid);
        uuid_generate(req.uuid);

        ret = udp_sock_boardcast(sd, netconf.network[0].network, netconf.network[0].mask,
                                 LICH_BOARDCAST_PORT, &req, sizeof(req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (force) {
                ret = __locator_rpc_lookup_wait(sd, req.uuid, chkid, nid, tmo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = lease_get(chkid, nid, NULL);
                if (unlikely(ret)) {
                        ret = (ret == ENOKEY || ret == ETIME) ? EAGAIN : ret;
                        GOTO(err_ret, ret);
                }
        }
        
        return 0;
err_ret:
        return ret;
}


int locator_rpc_lookup(const chkid_t *chkid, nid_t *nid, int force)
{
        int ret, sd;
        char hostname[MAX_NAME_LEN];
        fileid_t parent;

        ret = md_parent_get1(chkid, &parent);
        if (ret) {
                DBUG("lookup "CHKID_FORMAT" parent not found ret:%d\n", CHKID_ARG(chkid), ret);
                memset(&parent, 0x0, sizeof(parent));
        } else {
                DBUG("lookup " CHKID_FORMAT" parent "CHKID_FORMAT"\n",
                     CHKID_ARG(chkid), CHKID_ARG(&parent));
        }
        
        ANALYSIS_BEGIN(0);
        
        ret = net_gethostname(hostname, MAX_NAME_LEN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = udp_sock_listen(&sd, hostname, NULL, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __locator_rpc_lookup(sd, chkid, &parent, nid, 5 * 1000 * 1000, force);
        if (unlikely(ret)) {
                DWARN("lookup "CHKID_FORMAT" ret (%d) %s\n", CHKID_ARG(chkid),
                      ret, strerror(ret));
                GOTO(err_sd, ret);
        } else {
                DINFO("lookup "CHKID_FORMAT" srv @ %s\n", CHKID_ARG(chkid),
                      network_rname(nid));
        }

        close(sd);

        ANALYSIS_END(0, IO_WARN, id2str(chkid));
        
        return 0;
err_sd:
        close(sd);
err_ret:
        ANALYSIS_END(0, IO_WARN, id2str(chkid));
        return ret;
}

int locator_rpc_boardcast(const chkid_t *chkid)
{
        int ret, sd;
        char hostname[MAX_NAME_LEN];

        ANALYSIS_BEGIN(0);
        
        ret = net_gethostname(hostname, MAX_NAME_LEN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = udp_sock_listen(&sd, hostname, NULL, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __locator_rpc_lookup(sd, chkid, NULL, NULL, 5 * 1000 * 1000, 1);
        if (unlikely(ret)) {
                DWARN("lookup "CHKID_FORMAT" ret (%d) %s\n", CHKID_ARG(chkid),
                      ret, strerror(ret));
                GOTO(err_sd, ret);
        }

        close(sd);

        ANALYSIS_END(0, IO_WARN, id2str(chkid));
        
        return 0;
err_sd:
        close(sd);
err_ret:
        ANALYSIS_END(0, IO_WARN, id2str(chkid));
        return ret;
}

int locator_rpc_init()
{
        int ret;
        char port[40];
        pthread_t th;
        pthread_attr_t ta;

        ret = ymalloc((void **)&locator_srv, sizeof(*locator_srv));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("locator_rpc_init\n");

        sprintf(port, "%d", LICH_BOARDCAST_PORT);

        ret = udp_sock_listen(&locator_srv->sd, NULL, port, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __locator_srv_worker, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
